AWS GlueのIcebergテーブルに対して自動的にコンパクションを実行する機能を使ってみた
データアナリティクス事業本部 機械学習チームの鈴木です。
先日、Glueデータカタログ上からIceberg形式のGlueテーブルに対して自動でファイルのコンパクションを実行する機能のリリースがアナウンスされていたので試してみました。
例えば機械学習の文脈でも、データレイク上のIcebergテーブルから特徴量を取得し利用することがあります。『Amazon SageMaker Feature Store が Apache Iceberg テーブルフォーマットをサポートするように』のようなユースケースです。このアナウンスでも紹介されているように、小さなデータファイルをまとめて大きなファイルに置き換える仕組みを自動化する機能がGlueデータカタログでリリースされました。
自動的にコンパクションを実行する機能について
Icebergテーブルへのトランザクション書き込みによって生成された個々の小さなファイルを、自動的にいくつかの大きなファイルに圧縮する機能になります。
Athenaのガイドでは、Icebergテーブルの最適化について、現状OPTIMIZE
とVACUUM
の2つのメンテナンスコマンドが提供されています。
いずれも手動実行のためのコマンドになり、定期実行するためには専用のジョブを作成しスケジュールしておく必要があります。
今回リリースされた機能は、トランザクションにより作成された小さなファイルをGlueの機能で自動的に統合してくれるものとなります。
なお、先にOPTIMIZE
とVACUUM
のメンテナンスコマンドを引き合いに出しましたが、この機能はOPTIMIZE
に類似しているものの、記事執筆時点では厳密には異なる機能だと理解しています。
Amazon AthenaではIcebergテーブルの更新には、『Updating Iceberg table data - Amazon Athena』に記載のようにMerge-On-Read (MOR)を採用しています。更新により発生した削除データは、圧縮実行時にマージされます。
冒頭のブログの『Things to know』によると、この自動的なコンパクション機能では、削除ファイルはまとめないことが記載されていますが、OPTIMIZE
コマンドでは削除ファイルをデータファイルにまとめます。
Icebergテーブルの最適化のイメージについては、以下のブログ記事が参考になりました。
- Fine-Tuning Apache Iceberg Tables - Dremio Blog | Dremio
- Row-Level Changes on the Lakehouse: Copy-On-Write vs. Merge-On-Read in Apache Iceberg | Dremio
事前準備
では、自動的なコンパクション機能を試していきたいですが、そのためにまず、事前準備として検証に必要なリソースを作成しました。
1. Icebergテーブルの作成
コンパクションの対象とするIcebergテーブルを作成しました。
以下のようにフォーマットはPARQUET、SNAPPYの圧縮方式のIcebergテーブルを作成しました。特にデータを配置するS3バケット名
は自身のものに修正してください。
CREATE TABLE target_table_iceberg_parquet ( item_id string, item_value string) LOCATION 's3://データを配置するS3バケット名/target_table_iceberg_parquet' TBLPROPERTIES ( 'table_type'='iceberg', 'write_compression'='snappy', 'format'='PARQUET' );
続いて以下のようにしてデータを格納しておきました。
INSERT INTO target_table_iceberg_parquet (item_id, item_value) SELECT item_id, item_value FROM sample_table;
なお、この機能の対象となるIcebergテーブルの要件については以下のページをご確認ください。
ここで以下のようにテーブルが参照するファイルを確認すると、1ファイルであることが分かりました。
SELECT * FROM "target_table_iceberg_parquet$files"
2. IAMロールの作成
自動コンパクション設定時にGlueデータカタログで指定するIAMロールを作成しました。
以下のガイドを参考に作成しました。なお、今回はLakeFormation下の環境ではなく、ただのGlueデータカタログとS3バケットを使っているだけの環境を想定しています。
まず、以下のような許可ポリシーを作成しました。特にデータを配置するS3バケット名
・アカウントID
・Glueデータベース名
は自身のものに修正してください。リージョンは東京リージョンを想定しています。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:DeleteObject" ], "Resource": [ "arn:aws:s3:::データを配置するS3バケット名/*" ] }, { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::データを配置するS3バケット名" ] }, { "Effect": "Allow", "Action": [ "glue:UpdateTable", "glue:GetTable" ], "Resource": [ "arn:aws:glue:ap-northeast-1:アカウントID:table/Glueデータベース名/target_table_iceberg_parquet", "arn:aws:glue:ap-northeast-1:アカウントID:database/Glueデータベース名", "arn:aws:glue:ap-northeast-1:アカウントID:catalog" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "arn:aws:logs:ap-northeast-1:アカウントID:log-group:/aws-glue/iceberg-compaction/logs:*" } ] }
以下のようにポリシーの作成
からポリシーを作成しました。
次に、ロールを作成
から、Glueをユースケースに、このポリシーをアタッチしたIAMロールを作成しました。名前はもちろんなんでも構いませんが、後の説明で分かりやすいよう、cm-nayuts-glue-table-optimization-role
という名前にしました。
以下のようにIAMロールができました。
コンパクションの検証
次に、実際に小さいファイルを作成した後、自動的なコンパクションを設定して、どのようにデータファイルが圧縮されるか確認します。
1. 小さいファイルの作成
何回かトランザクションが実行されたと想定して、以下のような適当なUPDATE文を一つづつ実行しました。
UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='1'; UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='10'; UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='100'; UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='30'; UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='3331'; UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='70'; UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='500'; UPDATE target_table_iceberg_parquet SET item_value='updated' WHERE item_id='9995';
実行後、以下のようにテーブルが参照するファイルを確認すると、数が増えて9ファイルであることが分かりました。
SELECT * FROM "target_table_iceberg_parquet$files"
2. Glueテーブルでコンパクションの有効化・コンパクションの実行
Glueの画面から該当するGlueテーブルを開き、ページ下部のTable optimization
タブよりEnable compaction
ボタンをクリックしました。
Enable compaction
画面に遷移するので、IAM roleとして先ほど作成したIAMロールを選択し、Enable compaction
ボタンをクリックしました。
再度Glueテーブルを開くと、Compaction statusがEnabled
になっていたので少し待ちました。
さらにしばらくすると、Compaction statusがSuccess
になりました。
Compaction history
タブから履歴を確認することも可能でした。
3. 結果の確認
以下のようにテーブルが参照するファイルを確認すると、2ファイルであることが分かりました。
SELECT * FROM "target_table_iceberg_parquet$files"
関連機能の確認
1. メトリクスの確認
CloudWatchメトリクスより、コンパクションに使ったDPU数などのメトリクスを確認することができました。
2. ログの確認
IAMポリシーでも定義しましたが、CloudWatch Logsに実行したログが出力されました。
3. OPTIMIZEコマンドとの違い
コンパクション実行後のテーブルにOPTIMIZE
コマンドを実行して違いを確認してみました。
OPTIMIZE target_table_iceberg_parquet REWRITE DATA USING BIN_PACK
実行後にファイル件数を確認し、1ファイルであることが分かりました。
自動的なコンパクションでスキップされていたデータがまとめられたものと考えられます。
最後に
GlueがサポートしたIcebergテーブルに対する自動的なコンパクション機能について設定方法例や動きについてご紹介しました。Icebergテーブルへのサポートがますます充実してきており、AthenaおよびGlueでぜひ採用したくなります!
参考になりましたら幸いです。
参考
APIからも利用できるアナウンスがされていましたので、興味のある方はマネジメントコンソールだけでなく、AWS CLIや各種SDKからも利用できるかご確認ください。